feat(enterprise): add data drains for continuous export to S3 / webhook#4440
feat(enterprise): add data drains for continuous export to S3 / webhook#4440waleedlatif1 merged 5 commits intostagingfrom
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
PR SummaryHigh Risk Overview Introduces new persistence and orchestration: Adds delivery implementations with security controls: S3 writes with deterministic keys + Reviewed by Cursor Bugbot for commit abdb0b6. Configure here. |
53850b2 to
db5d298
Compare
Greptile SummaryThis PR introduces a full data-drains subsystem for continuous export of workflow logs, job logs, audit logs, copilot chats, and copilot runs to customer-owned S3 buckets or HTTPS webhooks on hourly or daily schedules. It pairs with data retention so customers can drain into long-term storage before Sim deletes records.
Confidence Score: 5/5Safe to merge; all blocking issues from prior review rounds have been addressed and the remaining observations are non-critical. The implementation is thorough: at-least-once delivery with correct cursor semantics, abort signal propagated end-to-end through HTTP sockets, SSRF validation at both schema and DNS layers, atomic conditional claim preventing duplicate dispatch, orphan reaper for worker crashes, and enterprise-plan + feature-flag gates on all read and write paths. apps/sim/lib/data-drains/sources/audit-logs.ts (null-workspace query branch has no JSON index, may slow as audit volume grows) and apps/sim/lib/data-drains/dispatcher.ts (enqueue failures leave candidates count inconsistent with dispatched+skipped). Important Files Changed
Sequence DiagramsequenceDiagram
participant Cron as Cron (hourly)
participant Dispatcher as dispatchDueDrains
participant DB as PostgreSQL
participant Queue as Job Queue
participant Runner as run-data-drain task
participant Source as DrainSource
participant Dest as DrainDestination
Cron->>Dispatcher: tick
Dispatcher->>DB: reapOrphanedRuns
Dispatcher->>DB: SELECT due drains
loop per candidate
Dispatcher->>DB: Enterprise plan check
Dispatcher->>DB: Conditional UPDATE lastRunAt=now
Dispatcher->>Queue: enqueue run-data-drain
end
Queue->>Runner: execute
Runner->>DB: INSERT dataDrainRuns running
Runner->>Source: pages(cursor, signal)
loop per chunk
Source->>DB: SELECT rows WHERE cursor
Source-->>Runner: chunk[]
Runner->>Dest: deliver(body, signal)
Dest-->>Runner: locator
end
alt signal.aborted
Runner->>DB: UPDATE status=failed
else success
Runner->>DB: TRANSACTION cursor + status=success
end
Runner->>Dest: session.close()
Reviews (20): Last reviewed commit: "test(data-drains): drift guard ensures e..." | Re-trigger Greptile |
|
@greptile |
|
@cursor review |
|
@greptile |
|
@cursor review |
|
@greptile |
|
@cursor review |
|
@greptile |
|
@cursor review |
|
@greptile |
|
@greptile |
|
@cursor review |
|
@greptile |
|
@cursor review |
|
@greptile |
|
@cursor review |
There was a problem hiding this comment.
✅ Bugbot reviewed your changes and found no new issues!
Comment @cursor review or bugbot run to trigger another review on this PR
Reviewed by Cursor Bugbot for commit 62c8b53. Configure here.
…te baseline Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
62c8b53 to
ccd1695
Compare
|
@greptile |
|
@cursor review |
…orced Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…sim-signature Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Asserts that any header buildHeaders writes is rejected when reused as a custom signatureHeader. Adding a new metadata header without mirroring it into RESERVED_SIGNATURE_HEADER_NAMES now fails CI. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
@greptile |
|
@cursor review |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit abdb0b6. Configure here.
| await db | ||
| .update(dataDrains) | ||
| .set({ lastRunAt: candidate.lastRunAt, updatedAt: now }) | ||
| .where(and(eq(dataDrains.id, candidate.id), eq(dataDrains.lastRunAt, now))) |
There was a problem hiding this comment.
Dispatcher claim rollback uses reference-equal Date comparison
Low Severity
The claim step sets lastRunAt: now and the rollback's WHERE clause checks eq(dataDrains.lastRunAt, now) using the same Date object. While this works because drizzle serializes the Date to the same timestamp value, the rollback can silently succeed even when the runDrain service has already started and set its own lastRunAt, reverting a legitimately running drain's timestamp. This happens if queue.enqueue() throws after the job is actually accepted (e.g., network timeout after server-side commit). The service would later overwrite lastRunAt on completion, but there's a brief window where the drain could be re-dispatched by another cron tick.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit abdb0b6. Configure here.
| and(eq(dataDrains.scheduleCadence, 'hourly'), lt(dataDrains.lastRunAt, hourlyCutoff)), | ||
| and(eq(dataDrains.scheduleCadence, 'daily'), lt(dataDrains.lastRunAt, dailyCutoff)) | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Dispatcher duePredicate object reused across SELECT and claim UPDATE
Low Severity
The duePredicate Drizzle expression object is built once and reused in both the initial SELECT and each per-candidate claim UPDATE. Drizzle expression objects carry bound parameter references; reusing the same object reference across multiple query builder calls relies on Drizzle treating the object as immutable. If Drizzle ever mutates the SQL node during query compilation (or caches compilation state), the second and subsequent uses of the same duePredicate reference in the claim loop could produce incorrect SQL. Building the predicate fresh per claim would be more defensive.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit abdb0b6. Configure here.
| headers.Authorization = `Bearer ${input.credentials.bearerToken}` | ||
| } | ||
| return headers | ||
| } |
There was a problem hiding this comment.
Webhook custom signatureHeader overwrites default without trace
Low Severity
In buildHeaders, when a custom signatureHeader is provided, the signature is placed under that key only — the default X-Sim-Signature header disappears entirely. Downstream receivers that hardcode checking X-Sim-Signature would silently see no signature when a custom header is configured. Meanwhile, the X-Sim-Signature-Version header is always sent regardless, creating a mismatch where the version header is present but the signature header it refers to doesn't exist under the documented default name. This is more of a design subtlety than a critical issue but could confuse integrators.
Reviewed by Cursor Bugbot for commit abdb0b6. Configure here.


Summary
DATA_DRAINS_ENABLED/NEXT_PUBLIC_DATA_DRAINS_ENABLED, mirroring data retentionType of Change
Testing
bun run check:api-validationpassingChecklist